package hanxx.kafka.produce;

import hanxx.security.domain.SysUser;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

/**
 * @edit:韩鑫鑫
 * @Date: 2022/09/05/14:24
 * @Description:
 */
@Configuration
public class KafkaProducerConfig
{
    @Value(value = "8.142.123.5:9092")
    private String bootstrapAddress;

    //1. Send string to Kafka

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.RETRIES_CONFIG,10);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }


    /*public void sendTopic(String topic,String key,String value){
        Properties props = new Properties();
        props.put(ProducerConfig.RETRIES_CONFIG,10);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);
        ProducerRecord<String,String> record = new ProducerRecord<>(topic,key,value);
        try {
            producer.send(record);
        } catch (Exception e){
            e.printStackTrace();
        }
        producer.close();
    }*/

    //2. Send User objects to Kafka
    @Bean
    public ProducerFactory<String, SysUser> userProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, SysUser> userKafkaTemplate() {
        return new KafkaTemplate<>(userProducerFactory());
    }
}
